今天要來用 ES 的 Ingest Pipeline 處理前面用來做 Auto complete 的 Tweeter 資料,先回顧一下:
資料的樣子
{'name': 'MaggieBreathnac',
'user_id': 487990281,
'tweet': 'Making memories #nanny #anpost #isolation #happy #corona @ An Rinn '
'https://t.co/byA9uSVxFc',
'tweet_id': 1244969855058677766,
'retweets': 0,
'favorites': 0,
'created': '31-Mar-2020',
'followers': 522,
'is_user_verified': False,
'geo': {'type': 'Point', 'coordinates': [52.04681279, -7.56678938]},
'coordinates': {'type': 'Point', 'coordinates': [-7.56678938, 52.04681279]},
'location': 'dublin',
'primary_location': {'type': 'Point',
'coordinates': [-7.56678938, 52.04681279]}}
希望他變成的樣子
{
"name": 'MaggieBreathnac',
"user_id": 487990281,
"tweet": 'Making memories #nanny #anpost #isolation #happy #corona @ An Rinn '
'https://t.co/byA9uSVxFc',
"tweet_id": 1244969855058677766,
"retweets": 0,
"favorites": 0,
"created": '31-Mar-2020',
"followers": 522,
"is_user_verified": False,
"geo": [-7.56678938, 52.04681279],
"location": 'dublin'
}
python 做了什麼處理
def data_to_es(json_file: str, index_name: str):
with open(json_file, 'r') as f:
data = json.load(f)
fields_to_rm = ['primary_location', 'coordinates']
for d in data:
for f in fields_to_rm:
if d.get(f):
d.pop(f)
if d.get('geo'):
d['geo'] = d['geo']['coordinates'].reverse()
if d.get('created'):
d['created'] = datetime.strptime(d['created'], "%d-%b-%Y")
d['_index'] = index_name
yield d
primary_location
和 coordinates
這兩個值這些轉換都蠻單純的,符合使用 ES ingest pipeline 的情境,以下是三個步驟的 Processor:
移除 primary_location
和 coordinates
這兩個值
{
"remove": {
"field": [
"primary_location",
"coordinates"
]
}
}
轉換 geo.coordinates 內的值並寫為 geo 欄位
{
"json": {
"field": "geo.coordinates",
"target_field": "geo"
}
}
b. 將 geo 欄位內的經緯度反過來
{
"script": {
"source": "ArrayList tmp = ctx[\"geo\"]; Collections.reverse(tmp); //ctx[\"geo\"]=[ctx[\"geo\"][1], ctx[\"geo\"][0]] ctx[\"geo\"] = tmp; "
}
}
解析 created 欄位為日期欄位
{
"date": {
"field": "created",
"formats": [
"dd-MMM-yyyy"
],
"target_field": "created"
}
}
這樣我們就完成了一條資料管線了,其中比較麻煩的是要的經緯度 arrayList 反序的 processor,需要理解 painless pipeline,但比起要為了這一條管線開一個環境安裝 python、維運 python script 單純多了。
這系列文章也終於告一個段落了,下一篇會來個總回顧~